// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements a `CandidateBackingSubsystem`.

#![recursion_limit="256"]

use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use bitvec::vec::BitVec;
use log;
use futures::{
	select, FutureExt, SinkExt, StreamExt,
	channel::{oneshot, mpsc},
	future::{self, Either},
	task::{Spawn, SpawnError, SpawnExt},
};
use futures_timer::Delay;
use streamunordered::{StreamUnordered, StreamYield};

use primitives::Pair;
use keystore::KeyStorePtr;
use polkadot_primitives::v1::{
	CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorPair, ValidatorId,
	ValidatorIndex, SigningContext, PoV, OmittedValidationData,
	CandidateDescriptor, AvailableData, ErasureChunk, ValidatorSignature, Hash, CandidateReceipt,
	CandidateCommitments,
};
use polkadot_node_primitives::{
	FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
	ValidationOutputs,
};
use polkadot_subsystem::{
	FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem,
};
use polkadot_subsystem::messages::{
	AllMessages, CandidateBackingMessage, CandidateSelectionMessage, SchedulerRoster,
	RuntimeApiMessage, RuntimeApiRequest, CandidateValidationMessage, ValidationFailed,
	StatementDistributionMessage, NewBackedCandidate, ProvisionerMessage, ProvisionableData,
	PoVDistributionMessage, AvailabilityStoreMessage,
};
use statement_table::{
	generic::AttestedCandidate as TableAttestedCandidate,
	Context as TableContextTrait,
	Table,
	v1::{
		Statement as TableStatement,
		SignedStatement as TableSignedStatement, Summary as TableSummary,
	},
};

#[derive(Debug, derive_more::From)]
enum Error {
	NotInValidatorSet,
	CandidateNotFound,
	JobNotFound(Hash),
	InvalidSignature,
	#[from]
	Erasure(erasure_coding::Error),
	#[from]
	ValidationFailed(ValidationFailed),
	#[from]
	Oneshot(oneshot::Canceled),
	#[from]
	Mpsc(mpsc::SendError),
	#[from]
	Spawn(SpawnError),
}

/// Holds all data needed for candidate backing job operation.
struct CandidateBackingJob {
	/// The hash of the relay parent on top of which this job is doing it's work.
	parent: Hash,
	/// Inbound message channel receiving part.
	rx_to: mpsc::Receiver<ToJob>,
	/// Outbound message channel sending part.
	tx_from: mpsc::Sender<FromJob>,

	/// The `ParaId`s assigned to this validator.
	assignment: ParaId,
	/// We issued `Valid` or `Invalid` statements on about these candidates.
	issued_statements: HashSet<Hash>,
	/// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash.
	seconded: Option<Hash>,
	/// We have already reported misbehaviors for these validators.
	reported_misbehavior_for: HashSet<ValidatorIndex>,

	table: Table<TableContext>,
	table_context: TableContext,
}

const fn group_quorum(n_validators: usize) -> usize {
	(n_validators / 2) + 1
}

#[derive(Default)]
struct TableContext {
	signing_context: SigningContext,
	key: Option<ValidatorPair>,
	groups: HashMap<ParaId, Vec<ValidatorIndex>>,
	validators: Vec<ValidatorId>,
}

impl TableContextTrait for TableContext {
	type AuthorityId = ValidatorIndex;
	type Digest = Hash;
	type GroupId = ParaId;
	type Signature = ValidatorSignature;
	type Candidate = CommittedCandidateReceipt;

	fn candidate_digest(candidate: &CommittedCandidateReceipt) -> Hash {
		candidate.hash()
	}

	fn candidate_group(candidate: &CommittedCandidateReceipt) -> ParaId {
		candidate.descriptor().para_id
	}

	fn is_member_of(&self, authority: &ValidatorIndex, group: &ParaId) -> bool {
		self.groups.get(group).map_or(false, |g| g.iter().position(|a| a == authority).is_some())
	}

	fn requisite_votes(&self, group: &ParaId) -> usize {
		self.groups.get(group).map_or(usize::max_value(), |g| group_quorum(g.len()))
	}
}

impl TableContext {
	fn local_id(&self) -> Option<ValidatorId> {
		self.key.as_ref().map(|k| k.public())
	}

	fn local_index(&self) -> Option<ValidatorIndex> {
		self.local_id().and_then(|id|
			self.validators
				.iter()
				.enumerate()
				.find(|(_, k)| k == &&id)
				.map(|(i, _)| i as ValidatorIndex)
		)
	}
}

const CHANNEL_CAPACITY: usize = 64;

/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`.
enum ToJob {
	/// A `CandidateBackingMessage`.
	CandidateBacking(CandidateBackingMessage),
	/// Stop working.
	Stop,
}

/// A message type that is sent from `CandidateBackingJob` to `CandidateBackingSubsystem`.
enum FromJob {
	AvailabilityStore(AvailabilityStoreMessage),
	RuntimeApiMessage(RuntimeApiMessage),
	CandidateValidation(CandidateValidationMessage),
	CandidateSelection(CandidateSelectionMessage),
	Provisioner(ProvisionerMessage),
	PoVDistribution(PoVDistributionMessage),
	StatementDistribution(StatementDistributionMessage),
}

impl From<FromJob> for AllMessages {
	fn from(f: FromJob) -> Self {
		match f {
			FromJob::AvailabilityStore(msg) => AllMessages::AvailabilityStore(msg),
			FromJob::RuntimeApiMessage(msg) => AllMessages::RuntimeApi(msg),
			FromJob::CandidateValidation(msg) => AllMessages::CandidateValidation(msg),
			FromJob::CandidateSelection(msg) => AllMessages::CandidateSelection(msg),
			FromJob::StatementDistribution(msg) => AllMessages::StatementDistribution(msg),
			FromJob::PoVDistribution(msg) => AllMessages::PoVDistribution(msg),
			FromJob::Provisioner(msg) => AllMessages::Provisioner(msg),
		}
	}
}

// It looks like it's not possible to do an `impl From` given the current state of
// the code. So this does the necessary conversion.
fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
	let statement = match s.payload() {
		Statement::Seconded(c) => TableStatement::Candidate(c.clone()),
		Statement::Valid(h) => TableStatement::Valid(h.clone()),
		Statement::Invalid(h) => TableStatement::Invalid(h.clone()),
	};

	TableSignedStatement {
		statement,
		signature: s.signature().clone(),
		sender: s.validator_index(),
	}
}

// finds the first key we are capable of signing with out of the given set of validators,
// if any.
fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<ValidatorPair> {
	let keystore = keystore.read();
	validators.iter()
		.find_map(|v| {
			keystore.key_pair::<ValidatorPair>(&v).ok()
		})
}

impl CandidateBackingJob {
	/// Run asynchronously.
	async fn run(mut self) -> Result<(), Error> {
		while let Some(msg) = self.rx_to.next().await {
			match msg {
				ToJob::CandidateBacking(msg) => {
					self.process_msg(msg).await?;
				}
				_ => break,
			}
		}

		Ok(())
	}

	async fn issue_candidate_invalid_message(
		&mut self,
		candidate: CandidateReceipt,
	) -> Result<(), Error> {
		self.tx_from.send(FromJob::CandidateSelection(
			CandidateSelectionMessage::Invalid(self.parent, candidate)
		)).await?;

		Ok(())
	}

	/// Validate the candidate that is requested to be `Second`ed and distribute validation result.
	///
	/// Returns `Ok(true)` if we issued a `Seconded` statement about this candidate.
	async fn validate_and_second(
		&mut self,
		candidate: &CandidateReceipt,
		pov: PoV,
	) -> Result<bool, Error> {
		let valid = self.request_candidate_validation(
			candidate.descriptor().clone(),
			Arc::new(pov.clone()),
		).await?;

		let candidate_hash = candidate.hash();

		let statement = match valid {
			ValidationResult::Valid(outputs) => {
				// make PoV available for later distribution. Send data to the availability
				// store to keep. Sign and dispatch `valid` statement to network if we
				// have not seconded the given candidate.
				//
				// If the commitments hash produced by validation is not the same as given by
				// the collator, do not make available and report the collator.
				let commitments_check = self.make_pov_available(
					pov,
					outputs,
					|commitments| if commitments.hash() == candidate.commitments_hash {
						Ok(CommittedCandidateReceipt {
							descriptor: candidate.descriptor().clone(),
							commitments,
						})
					} else {
						Err(())
					},
				).await?;

				match commitments_check {
					Ok(candidate) => {
						self.issued_statements.insert(candidate_hash);
						Some(Statement::Seconded(candidate))
					}
					Err(()) => {
						self.issue_candidate_invalid_message(candidate.clone()).await?;
						None
					}
				}
			}
			ValidationResult::Invalid => {
				// no need to issue a statement about this if we aren't seconding it.
				//
				// there's an infinite amount of garbage out there. no need to acknowledge
				// all of it.
				self.issue_candidate_invalid_message(candidate.clone()).await?;
				None
			}
		};

		let issued_statement = statement.is_some();
		if let Some(signed_statement) = statement.and_then(|s| self.sign_statement(s)) {
			self.import_statement(&signed_statement).await?;
			self.distribute_signed_statement(signed_statement).await?;
		}

		Ok(issued_statement)
	}

	fn get_backed(&self) -> Vec<NewBackedCandidate> {
		let proposed = self.table.proposed_candidates(&self.table_context);
		let mut res = Vec::with_capacity(proposed.len());

		for p in proposed.into_iter() {
			let TableAttestedCandidate { candidate, validity_votes, .. } = p;

			let (ids, validity_votes): (Vec<_>, Vec<_>) = validity_votes
						.into_iter()
						.map(|(id, vote)| (id, vote.into()))
						.unzip();

			let group = match self.table_context.groups.get(&self.assignment) {
				Some(group) => group,
				None => continue,
			};

			let mut validator_indices = BitVec::with_capacity(
				group.len()
			);

			validator_indices.resize(group.len(), false);

			for id in ids.iter() {
				if let Some(position) = group.iter().position(|x| x == id) {
					validator_indices.set(position, true);
				}
			}

			let backed = BackedCandidate {
				candidate,
				validity_votes,
				validator_indices,
			};

			res.push(NewBackedCandidate(backed.clone()));
		}

		res
	}

	/// Check if there have happened any new misbehaviors and issue necessary messages.
	///
	/// TODO: Report multiple misbehaviors (https://github.com/paritytech/polkadot/issues/1387)
	async fn issue_new_misbehaviors(&mut self) -> Result<(), Error> {
		let mut reports = Vec::new();

		for (k, v) in self.table.get_misbehavior().iter() {
			if !self.reported_misbehavior_for.contains(k) {
				self.reported_misbehavior_for.insert(*k);

				let f = FromTableMisbehavior {
					id: *k,
					report: v.clone(),
					signing_context: self.table_context.signing_context.clone(),
					key: self.table_context.validators[*k as usize].clone(),
				};

				if let Ok(report) = MisbehaviorReport::try_from(f) {
					let message = ProvisionerMessage::ProvisionableData(
						ProvisionableData::MisbehaviorReport(self.parent, report)
					);

					reports.push(message);
				}
			}
		}

		for report in reports.drain(..) {
			self.send_to_provisioner(report).await?
		}

		Ok(())
	}

	/// Import a statement into the statement table and return the summary of the import.
	async fn import_statement(
		&mut self,
		statement: &SignedFullStatement,
	) -> Result<Option<TableSummary>, Error> {
		let stmt = primitive_statement_to_table(statement);

		let summary = self.table.import_statement(&self.table_context, stmt);

		self.issue_new_misbehaviors().await?;

		return Ok(summary);
	}

	async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> {
		match msg {
			CandidateBackingMessage::Second(_, candidate, pov) => {
				// Sanity check that candidate is from our assignment.
				if candidate.descriptor().para_id != self.assignment {
					return Ok(());
				}

				// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
				// Seconded statement only if we have not seconded any other candidate and
				// have not signed a Valid statement for the requested candidate.
				match self.seconded {
					// This job has not seconded a candidate yet.
					None => {
						let candidate_hash = candidate.hash();

						if !self.issued_statements.contains(&candidate_hash) {
							if let Ok(true) = self.validate_and_second(
								&candidate,
								pov,
							).await {
								self.seconded = Some(candidate_hash);
							}
						}
					}
					// This job has already seconded a candidate.
					Some(_) => {}
				}
			}
			CandidateBackingMessage::Statement(_, statement) => {
				self.check_statement_signature(&statement)?;
				self.maybe_validate_and_import(statement).await?;
			}
			CandidateBackingMessage::GetBackedCandidates(_, tx) => {
				let backed = self.get_backed();

				tx.send(backed).map_err(|_| oneshot::Canceled)?;
			}
		}

		Ok(())
	}

	/// Kick off validation work and distribute the result as a signed statement.
	async fn kick_off_validation_work(
		&mut self,
		summary: TableSummary,
	) -> Result<(), Error> {
		let candidate_hash = summary.candidate.clone();

		if self.issued_statements.contains(&candidate_hash) {
			return Ok(())
		}

		// We clone the commitments here because there are borrowck
		// errors relating to this being a struct and methods borrowing the entirety of self
		// and not just those things that the function uses.
		let candidate = self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?;
		let expected_commitments = candidate.commitments.clone();

		let descriptor = candidate.descriptor().clone();
		let pov = self.request_pov_from_distribution(descriptor.clone()).await?;
		let v = self.request_candidate_validation(descriptor, pov.clone()).await?;

		let statement = match v {
			ValidationResult::Valid(outputs) => {
				// If validation produces a new set of commitments, we vote the candidate as invalid.
				let commitments_check = self.make_pov_available(
					(&*pov).clone(),
					outputs,
					|commitments| if commitments == expected_commitments {
						Ok(())
					} else {
						Err(())
					}
				).await?;

				match commitments_check {
					Ok(()) => Statement::Valid(candidate_hash),
					Err(()) => Statement::Invalid(candidate_hash),
				}
			}
			ValidationResult::Invalid => {
				Statement::Invalid(candidate_hash)
			}
		};

		self.issued_statements.insert(candidate_hash);

		if let Some(signed_statement) = self.sign_statement(statement) {
			self.distribute_signed_statement(signed_statement).await?;
		}

		Ok(())
	}

	/// Import the statement and kick off validation work if it is a part of our assignment.
	async fn maybe_validate_and_import(
		&mut self,
		statement: SignedFullStatement,
	) -> Result<(), Error> {
		if let Some(summary) = self.import_statement(&statement).await? {
			if let Statement::Seconded(_) = statement.payload() {
				if summary.group_id == self.assignment {
					self.kick_off_validation_work(summary).await?;
				}
			}
		}

		Ok(())
	}

	fn sign_statement(&self, statement: Statement) -> Option<SignedFullStatement> {
		let local_index = self.table_context.local_index()?;

		let signing_key = self.table_context.key.as_ref()?;

		let signed_statement = SignedFullStatement::sign(
			statement,
			&self.table_context.signing_context,
			local_index,
			signing_key,
		);

		Some(signed_statement)
	}

	fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
		let idx = statement.validator_index() as usize;

		if self.table_context.validators.len() > idx {
			statement.check_signature(
				&self.table_context.signing_context,
				&self.table_context.validators[idx],
			).map_err(|_| Error::InvalidSignature)?;
		} else {
			return Err(Error::InvalidSignature);
		}

		Ok(())
	}

	async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> {
		self.tx_from.send(FromJob::Provisioner(msg)).await?;

		Ok(())
	}

	async fn request_pov_from_distribution(
		&mut self,
		descriptor: CandidateDescriptor,
	) -> Result<Arc<PoV>, Error> {
		let (tx, rx) = oneshot::channel();

		self.tx_from.send(FromJob::PoVDistribution(
			PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx)
		)).await?;

		Ok(rx.await?)
	}

	async fn request_candidate_validation(
		&mut self,
		candidate: CandidateDescriptor,
		pov: Arc<PoV>,
	) -> Result<ValidationResult, Error> {
		let (tx, rx) = oneshot::channel();

		self.tx_from.send(FromJob::CandidateValidation(
				CandidateValidationMessage::ValidateFromChainState(
					candidate,
					pov,
					tx,
				)
			)
		).await?;

		Ok(rx.await??)
	}

	async fn store_chunk(
		&mut self,
		id: ValidatorIndex,
		chunk: ErasureChunk,
	) -> Result<(), Error> {
		self.tx_from.send(FromJob::AvailabilityStore(
				AvailabilityStoreMessage::StoreChunk(self.parent, id, chunk)
			)
		).await?;

		Ok(())
	}

	// Compute the erasure-coding and make it available.
	//
	// This calls an inspection function before making the PoV available for any last checks
	// that need to be done. If the inspection function returns an error, this function returns
	// early without making the PoV available.
	async fn make_pov_available<T, E>(
		&mut self,
		pov: PoV,
		outputs: ValidationOutputs,
		with_commitments: impl FnOnce(CandidateCommitments) -> Result<T, E>,
	) -> Result<Result<T, E>, Error> {
		let omitted_validation = OmittedValidationData {
			global_validation: outputs.global_validation_schedule,
			local_validation: outputs.local_validation_data,
		};

		let available_data = AvailableData {
			pov,
			omitted_validation,
		};

		let chunks = erasure_coding::obtain_chunks_v1(
			self.table_context.validators.len(),
			&available_data,
		)?;

		let branches = erasure_coding::branches(chunks.as_ref());
		let erasure_root = branches.root();

		let commitments = CandidateCommitments {
			fees: outputs.fees,
			upward_messages: outputs.upward_messages,
			erasure_root,
			new_validation_code: outputs.new_validation_code,
			head_data: outputs.head_data,
		};

		let res = match with_commitments(commitments) {
			Ok(x) => x,
			Err(e) => return Ok(Err(e)),
		};

		for (index, (proof, chunk)) in branches.enumerate() {
			let chunk = ErasureChunk {
				chunk: chunk.to_vec(),
				index: index as u32,
				proof,
			};

			self.store_chunk(index as ValidatorIndex, chunk).await?;
		}

		Ok(Ok(res))
	}

	async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> {
		let smsg = StatementDistributionMessage::Share(self.parent, s);

		self.tx_from.send(FromJob::StatementDistribution(smsg)).await?;

		Ok(())
	}
}

struct JobHandle {
	abort_handle: future::AbortHandle,
	to_job: mpsc::Sender<ToJob>,
	finished: oneshot::Receiver<()>,
	su_handle: usize,
}

impl JobHandle {
	async fn stop(mut self) {
		let _ = self.to_job.send(ToJob::Stop).await;
		let stop_timer = Delay::new(Duration::from_secs(1));

		match future::select(stop_timer, self.finished).await {
			Either::Left((_, _)) => {
			},
			Either::Right((_, _)) => {
				self.abort_handle.abort();
			},
		}
	}

	async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> {
		Ok(self.to_job.send(msg).await?)
	}
}

struct Jobs<S> {
	spawner: S,
	running: HashMap<Hash, JobHandle>,
	outgoing_msgs: StreamUnordered<mpsc::Receiver<FromJob>>,
}

async fn run_job(
	parent: Hash,
	keystore: KeyStorePtr,
	rx_to: mpsc::Receiver<ToJob>,
	mut tx_from: mpsc::Sender<FromJob>,
) -> Result<(), Error> {
	let (validators, roster) = futures::try_join!(
		request_validators(parent, &mut tx_from).await?,
		request_validator_groups(parent, &mut tx_from).await?,
	)?;

	let key = signing_key(&validators[..], &keystore).ok_or(Error::NotInValidatorSet)?;
	let mut groups = HashMap::new();

	for assignment in roster.scheduled {
		if let Some(g) = roster.validator_groups.get(assignment.group_idx.0 as usize) {
			groups.insert(
				assignment.para_id,
				g.clone(),
			);
		}
	}

	let mut assignment = Default::default();

	if let Some(idx) = validators.iter().position(|k| *k == key.public()) {
		let idx = idx as u32;
		for (para_id, group) in groups.iter() {
			if group.contains(&idx) {
				assignment = *para_id;
				break;
			}
		}
	}

	let signing_context = request_signing_context(parent, &mut tx_from).await?.await?;

	let table_context = TableContext {
		signing_context,
		key: Some(key),
		groups,
		validators,
	};

	let job = CandidateBackingJob {
		parent,
		rx_to,
		tx_from,
		assignment,
		issued_statements: HashSet::new(),
		seconded: None,
		reported_misbehavior_for: HashSet::new(),
		table: Table::default(),
		table_context,
	};

	job.run().await
}

/// Request a validator set from the `RuntimeApi`.
async fn request_validators(
	parent: Hash,
	s: &mut mpsc::Sender<FromJob>,
) -> Result<oneshot::Receiver<Vec<ValidatorId>>, Error> {
	let (tx, rx) = oneshot::channel();

	s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request(
			parent,
			RuntimeApiRequest::Validators(tx),
		)
	)).await?;

	Ok(rx)
}

/// Request the scheduler roster from `RuntimeApi`.
async fn request_validator_groups(
	parent: Hash,
	s: &mut mpsc::Sender<FromJob>,
) -> Result<oneshot::Receiver<SchedulerRoster>, Error> {
	let (tx, rx) = oneshot::channel();

	s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request(
			parent,
			RuntimeApiRequest::ValidatorGroups(tx),
		)
	)).await?;

	Ok(rx)
}

/// Request a `SigningContext` from the `RuntimeApi`.
async fn request_signing_context(
	parent: Hash,
	s: &mut mpsc::Sender<FromJob>,
) -> Result<oneshot::Receiver<SigningContext>, Error> {
	let (tx, rx) = oneshot::channel();

	s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request(
			parent,
			RuntimeApiRequest::SigningContext(tx),
		)
	)).await?;

	Ok(rx)
}

impl<S: Spawn> Jobs<S> {
	fn new(spawner: S) -> Self {
		Self {
			spawner,
			running: HashMap::default(),
			outgoing_msgs: StreamUnordered::new(),
		}
	}

	fn spawn_job(&mut self, parent_hash: Hash, keystore: KeyStorePtr) -> Result<(), Error> {
		let (to_job_tx, to_job_rx) = mpsc::channel(CHANNEL_CAPACITY);
		let (from_job_tx, from_job_rx) = mpsc::channel(CHANNEL_CAPACITY);

		let (future, abort_handle) = future::abortable(async move {
			if let Err(e) = run_job(parent_hash, keystore, to_job_rx, from_job_tx).await {
				log::error!(
					"CandidateBackingJob({}) finished with an error {:?}",
					parent_hash,
					e,
				);
			}
		});

		let (finished_tx, finished) = oneshot::channel();

		let future = async move {
			let _ = future.await;
			let _ = finished_tx.send(());
		};
		self.spawner.spawn(future)?;

		let su_handle = self.outgoing_msgs.push(from_job_rx);

		let handle = JobHandle {
			abort_handle,
			to_job: to_job_tx,
			finished,
			su_handle,
		};

		self.running.insert(parent_hash, handle);

		Ok(())
	}

	async fn stop_job(&mut self, parent_hash: Hash) -> Result<(), Error> {
		match self.running.remove(&parent_hash) {
			Some(handle) => {
				Pin::new(&mut self.outgoing_msgs).remove(handle.su_handle);
				handle.stop().await;
				Ok(())
			}
			None => Err(Error::JobNotFound(parent_hash))
		}
	}

	async fn send_msg(&mut self, parent_hash: Hash, msg: ToJob) -> Result<(), Error> {
		if let Some(job) = self.running.get_mut(&parent_hash) {
			job.send_msg(msg).await?;
		}
		Ok(())
	}

	async fn next(&mut self) -> Option<FromJob> {
		self.outgoing_msgs.next().await.and_then(|(e, _)| match e {
			StreamYield::Item(e) => Some(e),
			_ => None,
		})
	}
}

/// An implementation of the Candidate Backing subsystem.
pub struct CandidateBackingSubsystem<S, Context> {
	spawner: S,
	keystore: KeyStorePtr,
	_context: std::marker::PhantomData<Context>,
}

impl<S, Context> CandidateBackingSubsystem<S, Context>
	where
		S: Spawn + Clone,
		Context: SubsystemContext<Message=CandidateBackingMessage>,
{
	/// Creates a new `CandidateBackingSubsystem`.
	pub fn new(keystore: KeyStorePtr, spawner: S) -> Self {
		Self {
			spawner,
			keystore,
			_context: std::marker::PhantomData,
		}
	}

	async fn run(
		mut ctx: Context,
		keystore: KeyStorePtr,
		spawner: S,
	) {
		let mut jobs = Jobs::new(spawner.clone());

		loop {
			select! {
				incoming = ctx.recv().fuse() => {
					match incoming {
						Ok(msg) => match msg {
							FromOverseer::Signal(OverseerSignal::StartWork(hash)) => {
								if let Err(e) = jobs.spawn_job(hash, keystore.clone()) {
									log::error!("Failed to spawn a job: {:?}", e);
									break;
								}
							}
							FromOverseer::Signal(OverseerSignal::StopWork(hash)) => {
								if let Err(e) = jobs.stop_job(hash).await {
									log::error!("Failed to spawn a job: {:?}", e);
									break;
								}
							}
							FromOverseer::Communication { msg } => {
								match msg {
									CandidateBackingMessage::Second(hash, _, _) |
									CandidateBackingMessage::Statement(hash, _) |
									CandidateBackingMessage::GetBackedCandidates(hash, _) => {
										let res = jobs.send_msg(
											hash.clone(),
											ToJob::CandidateBacking(msg),
										).await;

										if let Err(e) = res {
											log::error!(
												"Failed to send a message to a job: {:?}",
												e,
											);

											break;
										}
									}
									_ => (),
								}
							}
							_ => (),
						},
						Err(_) => break,
					}
				}
				outgoing = jobs.next().fuse() => {
					match outgoing {
						Some(msg) => {
							let _ = ctx.send_message(msg.into()).await;
						}
						None => break,
					}
				}
				complete => break,
			}
		}
	}
}

impl<S, Context> Subsystem<Context> for CandidateBackingSubsystem<S, Context>
	where
		S: Spawn + Send + Clone + 'static,
		Context: SubsystemContext<Message=CandidateBackingMessage>,
{
	fn start(self, ctx: Context) -> SpawnedSubsystem {
		let keystore = self.keystore.clone();
		let spawner = self.spawner.clone();

		SpawnedSubsystem(Box::pin(async move {
			Self::run(ctx, keystore, spawner).await;
		}))
	}
}

#[cfg(test)]
mod tests {
	use super::*;
	use futures::{Future, executor::{self, ThreadPool}};
	use std::collections::HashMap;
	use std::sync::Arc;
	use sp_keyring::Sr25519Keyring;
	use polkadot_primitives::v1::{
		AssignmentKind, CollatorId, CoreAssignment, BlockData, CoreIndex, GroupIndex, ValidityAttestation,
		CandidateCommitments, LocalValidationData, GlobalValidationSchedule, HeadData,
	};
	use assert_matches::assert_matches;

	fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
		val_ids.iter().map(|v| v.public().into()).collect()
	}

	struct TestState {
		chain_ids: Vec<ParaId>,
		keystore: KeyStorePtr,
		validators: Vec<Sr25519Keyring>,
		validator_public: Vec<ValidatorId>,
		global_validation_schedule: GlobalValidationSchedule,
		local_validation_data: LocalValidationData,
		roster: SchedulerRoster,
		head_data: HashMap<ParaId, HeadData>,
		signing_context: SigningContext,
		relay_parent: Hash,
	}

	impl Default for TestState {
		fn default() -> Self {
			let chain_a = ParaId::from(1);
			let chain_b = ParaId::from(2);
			let thread_a = ParaId::from(3);

			let chain_ids = vec![chain_a, chain_b, thread_a];

			let validators = vec![
				Sr25519Keyring::Alice,
				Sr25519Keyring::Bob,
				Sr25519Keyring::Charlie,
				Sr25519Keyring::Dave,
				Sr25519Keyring::Ferdie,
			];

			let keystore = keystore::Store::new_in_memory();
			// Make sure `Alice` key is in the keystore, so this mocked node will be a parachain validator.
			keystore.write().insert_ephemeral_from_seed::<ValidatorPair>(&validators[0].to_seed())
				.expect("Insert key into keystore");

			let validator_public = validator_pubkeys(&validators);

			let chain_a_assignment = CoreAssignment {
				core: CoreIndex::from(0),
				para_id: chain_a,
				kind: AssignmentKind::Parachain,
				group_idx: GroupIndex::from(0),
			};

			let chain_b_assignment = CoreAssignment {
				core: CoreIndex::from(1),
				para_id: chain_b,
				kind: AssignmentKind::Parachain,
				group_idx: GroupIndex::from(1),
			};

			let thread_collator: CollatorId = Sr25519Keyring::Two.public().into();

			let thread_a_assignment = CoreAssignment {
				core: CoreIndex::from(2),
				para_id: thread_a,
				kind: AssignmentKind::Parathread(thread_collator.clone(), 0),
				group_idx: GroupIndex::from(2),
			};

			let validator_groups = vec![vec![2, 0, 3], vec![1], vec![4]];

			let parent_hash_1 = [1; 32].into();

			let roster = SchedulerRoster {
				validator_groups,
				scheduled: vec![
					chain_a_assignment,
					chain_b_assignment,
					thread_a_assignment,
				],
				upcoming: vec![],
				availability_cores: vec![],
			};
			let signing_context = SigningContext {
				session_index: 1,
				parent_hash: parent_hash_1,
			};

			let mut head_data = HashMap::new();
			head_data.insert(chain_a, HeadData(vec![4, 5, 6]));

			let relay_parent = Hash::from([5; 32]);

			let local_validation_data = LocalValidationData {
				parent_head: HeadData(vec![7, 8, 9]),
				balance: Default::default(),
				code_upgrade_allowed: None,
				validation_code_hash: Default::default(),
			};

			let global_validation_schedule = GlobalValidationSchedule {
				max_code_size: 1000,
				max_head_data_size: 1000,
				block_number: Default::default(),
			};

			Self {
				chain_ids,
				keystore,
				validators,
				validator_public,
				roster,
				head_data,
				local_validation_data,
				global_validation_schedule,
				signing_context,
				relay_parent,
			}
		}
	}

	struct TestHarness {
		virtual_overseer: subsystem_test::TestSubsystemContextHandle<CandidateBackingMessage>,
	}

	fn test_harness<T: Future<Output=()>>(keystore: KeyStorePtr, test: impl FnOnce(TestHarness) -> T) {
		let pool = ThreadPool::new().unwrap();

		let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool.clone());

		let subsystem = CandidateBackingSubsystem::run(context, keystore, pool.clone());

		let test_fut = test(TestHarness {
			virtual_overseer,
		});

		futures::pin_mut!(test_fut);
		futures::pin_mut!(subsystem);

		executor::block_on(future::select(test_fut, subsystem));
	}

	fn make_erasure_root(test: &TestState, pov: PoV) -> Hash {
		let omitted_validation = OmittedValidationData {
			global_validation: test.global_validation_schedule.clone(),
			local_validation: test.local_validation_data.clone(),
		};

		let available_data = AvailableData {
			omitted_validation,
			pov,
		};

		let chunks = erasure_coding::obtain_chunks_v1(test.validators.len(), &available_data).unwrap();
		erasure_coding::branches(&chunks).root()
	}

	#[derive(Default)]
	struct TestCandidateBuilder {
		para_id: ParaId,
		head_data: HeadData,
		pov_hash: Hash,
		relay_parent: Hash,
		erasure_root: Hash,
	}

	impl TestCandidateBuilder {
		fn build(self) -> CommittedCandidateReceipt {
			CommittedCandidateReceipt {
				descriptor: CandidateDescriptor {
					para_id: self.para_id,
					pov_hash: self.pov_hash,
					relay_parent: self.relay_parent,
					..Default::default()
				},
				commitments: CandidateCommitments {
					head_data: self.head_data,
					erasure_root: self.erasure_root,
					..Default::default()
				},
			}
		}
	}

	// Tests that the subsystem performs actions that are requied on startup.
	async fn test_startup(
		virtual_overseer: &mut subsystem_test::TestSubsystemContextHandle<CandidateBackingMessage>,
		test_state: &TestState,
	) {
		// Start work on some new parent.
		virtual_overseer.send(FromOverseer::Signal(
			OverseerSignal::StartWork(test_state.relay_parent))
		).await;

		// Check that subsystem job issues a request for a validator set.
		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::RuntimeApi(
				RuntimeApiMessage::Request(parent, RuntimeApiRequest::Validators(tx))
			) if parent == test_state.relay_parent => {
				tx.send(test_state.validator_public.clone()).unwrap();
			}
		);

		// Check that subsystem job issues a request for the validator groups.
		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::RuntimeApi(
				RuntimeApiMessage::Request(parent, RuntimeApiRequest::ValidatorGroups(tx))
			) if parent == test_state.relay_parent => {
				tx.send(test_state.roster.clone()).unwrap();
			}
		);

		// Check that subsystem job issues a request for the signing context.
		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::RuntimeApi(
				RuntimeApiMessage::Request(parent, RuntimeApiRequest::SigningContext(tx))
			) if parent == test_state.relay_parent => {
				tx.send(test_state.signing_context.clone()).unwrap();
			}
		);
	}

	// Test that a `CandidateBackingMessage::Second` issues validation work
	// and in case validation is successful issues a `StatementDistributionMessage`.
	#[test]
	fn backing_second_works() {
		let test_state = TestState::default();
		test_harness(test_state.keystore.clone(), |test_harness| async move {
			let TestHarness { mut virtual_overseer } = test_harness;

			test_startup(&mut virtual_overseer, &test_state).await;

			let pov = PoV {
				block_data: BlockData(vec![42, 43, 44]),
			};

			let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();

			let pov_hash = pov.hash();
			let candidate = TestCandidateBuilder {
				para_id: test_state.chain_ids[0],
				relay_parent: test_state.relay_parent,
				pov_hash,
				head_data: expected_head_data.clone(),
				erasure_root: make_erasure_root(&test_state, pov.clone()),
				..Default::default()
			}.build();

			let second = CandidateBackingMessage::Second(
				test_state.relay_parent,
				candidate.to_plain(),
				pov.clone(),
			);

			virtual_overseer.send(FromOverseer::Communication{ msg: second }).await;


			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::CandidateValidation(
					CandidateValidationMessage::ValidateFromChainState(
						c,
						pov,
						tx,
					)
				) if pov == pov && &c == candidate.descriptor() => {
					tx.send(Ok(
						ValidationResult::Valid(ValidationOutputs {
							global_validation_schedule: test_state.global_validation_schedule,
							local_validation_data: test_state.local_validation_data,
							head_data: expected_head_data.clone(),
							upward_messages: Vec::new(),
							fees: Default::default(),
							new_validation_code: None,
						}),
					)).unwrap();
				}
			);

			for _ in 0..test_state.validators.len() {
				assert_matches!(
					virtual_overseer.recv().await,
					AllMessages::AvailabilityStore(
						AvailabilityStoreMessage::StoreChunk(parent_hash, _, _)
					) if parent_hash == test_state.relay_parent
				);
			}

			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::StatementDistribution(
					StatementDistributionMessage::Share(
						parent_hash,
						signed_statement,
					)
				) if parent_hash == test_state.relay_parent => {
					signed_statement.check_signature(
						&test_state.signing_context,
						&test_state.validator_public[0],
					).unwrap();
				}
			);

			virtual_overseer.send(FromOverseer::Signal(
				OverseerSignal::StopWork(test_state.relay_parent))
			).await;
		});
	}

	// Test that the candidate reaches quorum succesfully.
	#[test]
	fn backing_works() {
		let test_state = TestState::default();
		test_harness(test_state.keystore.clone(), |test_harness| async move {
			let TestHarness { mut virtual_overseer } = test_harness;

			test_startup(&mut virtual_overseer, &test_state).await;

			let pov = PoV {
				block_data: BlockData(vec![1, 2, 3]),
			};

			let pov_hash = pov.hash();

			let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();

			let candidate_a = TestCandidateBuilder {
				para_id: test_state.chain_ids[0],
				relay_parent: test_state.relay_parent,
				pov_hash,
				head_data: expected_head_data.clone(),
				erasure_root: make_erasure_root(&test_state, pov.clone()),
				..Default::default()
			}.build();

			let candidate_a_hash = candidate_a.hash();

			let signed_a = SignedFullStatement::sign(
				Statement::Seconded(candidate_a.clone()),
				&test_state.signing_context,
				2,
				&test_state.validators[2].pair().into(),
			);

			let signed_b = SignedFullStatement::sign(
				Statement::Valid(candidate_a_hash),
				&test_state.signing_context,
				0,
				&test_state.validators[0].pair().into(),
			);

			let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_a.clone());

			virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;

			// Sending a `Statement::Seconded` for our assignment will start
			// validation process. The first thing requested is PoV from the
			// `PoVDistribution`.
			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::PoVDistribution(
					PoVDistributionMessage::FetchPoV(relay_parent, _, tx)
				) if relay_parent == test_state.relay_parent => {
					tx.send(Arc::new(pov.clone())).unwrap();
				}
			);

			// The next step is the actual request to Validation subsystem
			// to validate the `Seconded` candidate.
			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::CandidateValidation(
					CandidateValidationMessage::ValidateFromChainState(
						c,
						pov,
						tx,
					)
				) if pov == pov && &c == candidate_a.descriptor() => {
					tx.send(Ok(
						ValidationResult::Valid(ValidationOutputs {
							global_validation_schedule: test_state.global_validation_schedule,
							local_validation_data: test_state.local_validation_data,
							head_data: expected_head_data.clone(),
							upward_messages: Vec::new(),
							fees: Default::default(),
							new_validation_code: None,
						}),
					)).unwrap();
				}
			);

			let statement = CandidateBackingMessage::Statement(
				test_state.relay_parent,
				signed_b.clone(),
			);

			virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;

			let (tx, rx) = oneshot::channel();

			// The backed candidats set should be not empty at this point.
			virtual_overseer.send(FromOverseer::Communication{
				msg: CandidateBackingMessage::GetBackedCandidates(
					test_state.relay_parent,
					tx,
				)
			}).await;

			let backed = rx.await.unwrap();

			// `validity_votes` may be in any order so we can't do this in a single assert.
			assert_eq!(backed[0].0.candidate, candidate_a);
			assert_eq!(backed[0].0.validity_votes.len(), 2);
			assert!(backed[0].0.validity_votes.contains(
				&ValidityAttestation::Explicit(signed_b.signature().clone())
			));
			assert!(backed[0].0.validity_votes.contains(
				&ValidityAttestation::Implicit(signed_a.signature().clone())
			));
			assert_eq!(backed[0].0.validator_indices, bitvec::bitvec![Lsb0, u8; 1, 1, 0]);

			virtual_overseer.send(FromOverseer::Signal(
				OverseerSignal::StopWork(test_state.relay_parent))
			).await;
		});
	}

	// Issuing conflicting statements on the same candidate should
	// be a misbehavior.
	#[test]
	fn backing_misbehavior_works() {
		let test_state = TestState::default();
		test_harness(test_state.keystore.clone(), |test_harness| async move {
			let TestHarness { mut virtual_overseer } = test_harness;

			test_startup(&mut virtual_overseer, &test_state).await;

			let pov = PoV {
				block_data: BlockData(vec![1, 2, 3]),
			};

			let pov_hash = pov.hash();

			let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();

			let candidate_a = TestCandidateBuilder {
				para_id: test_state.chain_ids[0],
				relay_parent: test_state.relay_parent,
				pov_hash,
				erasure_root: make_erasure_root(&test_state, pov.clone()),
				head_data: expected_head_data.clone(),
				..Default::default()
			}.build();

			let candidate_a_hash = candidate_a.hash();

			let signed_a = SignedFullStatement::sign(
				Statement::Seconded(candidate_a.clone()),
				&test_state.signing_context,
				2,
				&test_state.validators[2].pair().into(),
			);

			let signed_b = SignedFullStatement::sign(
				Statement::Valid(candidate_a_hash),
				&test_state.signing_context,
				0,
				&test_state.validators[0].pair().into(),
			);

			let signed_c = SignedFullStatement::sign(
				Statement::Invalid(candidate_a_hash),
				&test_state.signing_context,
				0,
				&test_state.validators[0].pair().into(),
			);

			let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_a.clone());

			virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;

			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::PoVDistribution(
					PoVDistributionMessage::FetchPoV(relay_parent, _, tx)
				) if relay_parent == test_state.relay_parent => {
					tx.send(Arc::new(pov.clone())).unwrap();
				}
			);

			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::CandidateValidation(
					CandidateValidationMessage::ValidateFromChainState(
						c,
						pov,
						tx,
					)
				) if pov == pov && &c == candidate_a.descriptor() => {
					tx.send(Ok(
						ValidationResult::Valid(ValidationOutputs {
							global_validation_schedule: test_state.global_validation_schedule,
							local_validation_data: test_state.local_validation_data,
							head_data: expected_head_data.clone(),
							upward_messages: Vec::new(),
							fees: Default::default(),
							new_validation_code: None,
						}),
					)).unwrap();
				}
			);

			for _ in 0..test_state.validators.len() {
				assert_matches!(
					virtual_overseer.recv().await,
					AllMessages::AvailabilityStore(
						AvailabilityStoreMessage::StoreChunk(parent_hash, _, _)
					) if parent_hash == test_state.relay_parent
				);
			}

			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::StatementDistribution(
					StatementDistributionMessage::Share(
						relay_parent,
						signed_statement,
					)
				) if relay_parent == test_state.relay_parent => {
					signed_statement.check_signature(
						&test_state.signing_context,
						&test_state.validator_public[0],
					).unwrap();

					assert_eq!(*signed_statement.payload(), Statement::Valid(candidate_a_hash));
				}
			);

			let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_b.clone());

			virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;

			let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_c.clone());

			virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;

			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::Provisioner(
					ProvisionerMessage::ProvisionableData(
						ProvisionableData::MisbehaviorReport(
							relay_parent,
							MisbehaviorReport::SelfContradiction(_, s1, s2),
						)
					)
				) if relay_parent == test_state.relay_parent => {
					s1.check_signature(
						&test_state.signing_context,
						&test_state.validator_public[s1.validator_index() as usize],
					).unwrap();

					s2.check_signature(
						&test_state.signing_context,
						&test_state.validator_public[s2.validator_index() as usize],
					).unwrap();
				}
			);

		});
	}

	// Test that if we are asked to second an invalid candidate we
	// can still second a valid one afterwards.
	#[test]
	fn backing_dont_second_invalid() {
		let test_state = TestState::default();
		test_harness(test_state.keystore.clone(), |test_harness| async move {
			let TestHarness { mut virtual_overseer } = test_harness;

			test_startup(&mut virtual_overseer, &test_state).await;

			let pov_block_a = PoV {
				block_data: BlockData(vec![42, 43, 44]),
			};

			let pov_block_b = PoV {
				block_data: BlockData(vec![45, 46, 47]),
			};

			let pov_hash_a = pov_block_a.hash();
			let pov_hash_b = pov_block_b.hash();

			let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();

			let candidate_a = TestCandidateBuilder {
				para_id: test_state.chain_ids[0],
				relay_parent: test_state.relay_parent,
				pov_hash: pov_hash_a,
				erasure_root: make_erasure_root(&test_state, pov_block_a.clone()),
				..Default::default()
			}.build();

			let candidate_b = TestCandidateBuilder {
				para_id: test_state.chain_ids[0],
				relay_parent: test_state.relay_parent,
				pov_hash: pov_hash_b,
				erasure_root: make_erasure_root(&test_state, pov_block_b.clone()),
				head_data: expected_head_data.clone(),
				..Default::default()
			}.build();

			let second = CandidateBackingMessage::Second(
				test_state.relay_parent,
				candidate_a.to_plain(),
				pov_block_a.clone(),
			);

			virtual_overseer.send(FromOverseer::Communication{ msg: second }).await;


			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::CandidateValidation(
					CandidateValidationMessage::ValidateFromChainState(
						c,
						pov,
						tx,
					)
				) if pov == pov && &c == candidate_a.descriptor() => {
					tx.send(Ok(ValidationResult::Invalid)).unwrap();
				}
			);

			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::CandidateSelection(
					CandidateSelectionMessage::Invalid(parent_hash, c)
				) if parent_hash == test_state.relay_parent && c == candidate_a.to_plain()
			);

			let second = CandidateBackingMessage::Second(
				test_state.relay_parent,
				candidate_b.to_plain(),
				pov_block_b.clone(),
			);

			virtual_overseer.send(FromOverseer::Communication{ msg: second }).await;

			let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();

			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::CandidateValidation(
					CandidateValidationMessage::ValidateFromChainState(
						c,
						pov,
						tx,
					)
				) if pov == pov && &c == candidate_b.descriptor() => {
					tx.send(Ok(
						ValidationResult::Valid(ValidationOutputs {
							global_validation_schedule: test_state.global_validation_schedule,
							local_validation_data: test_state.local_validation_data,
							head_data: expected_head_data.clone(),
							upward_messages: Vec::new(),
							fees: Default::default(),
							new_validation_code: None,
						}),
					)).unwrap();
				}
			);

			for _ in 0..test_state.validators.len() {
				assert_matches!(
					virtual_overseer.recv().await,
					AllMessages::AvailabilityStore(
						AvailabilityStoreMessage::StoreChunk(parent_hash, _, _)
					) if parent_hash == test_state.relay_parent
				);
			}

			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::StatementDistribution(
					StatementDistributionMessage::Share(
						parent_hash,
						signed_statement,
					)
				) if parent_hash == test_state.relay_parent => {
					signed_statement.check_signature(
						&test_state.signing_context,
						&test_state.validator_public[0],
					).unwrap();

					assert_eq!(*signed_statement.payload(), Statement::Seconded(candidate_b));
				}
			);

			virtual_overseer.send(FromOverseer::Signal(
				OverseerSignal::StopWork(test_state.relay_parent))
			).await;
		});
	}

	// Test that if we have already issued a statement (in this case `Invalid`) about a
	// candidate we will not be issuing a `Seconded` statement on it.
	#[test]
	fn backing_multiple_statements_work() {
		let test_state = TestState::default();
		test_harness(test_state.keystore.clone(), |test_harness| async move {
			let TestHarness { mut virtual_overseer } = test_harness;

			test_startup(&mut virtual_overseer, &test_state).await;

			let pov = PoV {
				block_data: BlockData(vec![42, 43, 44]),
			};

			let pov_hash = pov.hash();

			let candidate = TestCandidateBuilder {
				para_id: test_state.chain_ids[0],
				relay_parent: test_state.relay_parent,
				pov_hash,
				erasure_root: make_erasure_root(&test_state, pov.clone()),
				..Default::default()
			}.build();

			let candidate_hash = candidate.hash();

			let signed_a = SignedFullStatement::sign(
				Statement::Seconded(candidate.clone()),
				&test_state.signing_context,
				2,
				&test_state.validators[2].pair().into(),
			);

			// Send in a `Statement` with a candidate.
			let statement = CandidateBackingMessage::Statement(
				test_state.relay_parent,
				signed_a.clone(),
			);

			virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;

			// Subsystem requests PoV and requests validation.
			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::PoVDistribution(
					PoVDistributionMessage::FetchPoV(relay_parent, _, tx)
				) => {
					assert_eq!(relay_parent, test_state.relay_parent);
					tx.send(Arc::new(pov.clone())).unwrap();
				}
			);


			// Tell subsystem that this candidate is invalid.
			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::CandidateValidation(
					CandidateValidationMessage::ValidateFromChainState(
						c,
						pov,
						tx,
					)
				) if pov == pov && &c == candidate.descriptor() => {
					tx.send(Ok(ValidationResult::Invalid)).unwrap();
				}
			);

			// The invalid message is shared.
			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::StatementDistribution(
					StatementDistributionMessage::Share(
						relay_parent,
						signed_statement,
					)
				) => {
					assert_eq!(relay_parent, test_state.relay_parent);
					signed_statement.check_signature(
						&test_state.signing_context,
						&test_state.validator_public[0],
					).unwrap();
					assert_eq!(*signed_statement.payload(), Statement::Invalid(candidate_hash));
				}
			);

			// Ask subsystem to `Second` a candidate that already has a statement issued about.
			// This should emit no actions from subsystem.
			let second = CandidateBackingMessage::Second(
				test_state.relay_parent,
				candidate.to_plain(),
				pov.clone(),
			);

			virtual_overseer.send(FromOverseer::Communication{ msg: second }).await;

			let pov_to_second = PoV {
				block_data: BlockData(vec![3, 2, 1]),
			};

			let pov_hash = pov_to_second.hash();

			let candidate_to_second = TestCandidateBuilder {
				para_id: test_state.chain_ids[0],
				relay_parent: test_state.relay_parent,
				pov_hash,
				erasure_root: make_erasure_root(&test_state, pov_to_second.clone()),
				..Default::default()
			}.build();

			let second = CandidateBackingMessage::Second(
				test_state.relay_parent,
				candidate_to_second.to_plain(),
				pov_to_second.clone(),
			);

			// In order to trigger _some_ actions from subsystem ask it to second another
			// candidate. The only reason to do so is to make sure that no actions were
			// triggered on the prev step.
			virtual_overseer.send(FromOverseer::Communication{ msg: second }).await;

			assert_matches!(
				virtual_overseer.recv().await,
				AllMessages::CandidateValidation(
					CandidateValidationMessage::ValidateFromChainState(
						_,
						pov,
						_,
					)
				) => {
					assert_eq!(&*pov, &pov_to_second);
				}
			);
		});
	}
}
